package cloud.xiguapi.ubas.analysis.market.model;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @author 大大大西西瓜皮🍉
 * date: 2021-5-19 上午 10:23
 * desc:
 */
public class ChannelMarketingCountResult extends ProcessWindowFunction<Long, ChannelMarketingResult, Tuple, TimeWindow> {

    @Override
    public void process(Tuple tuple, Context context, Iterable<Long> elements, Collector<ChannelMarketingResult> out) throws Exception {
        // 提取字段
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");

        String channel = tuple.getField(0);
        String windowEnd = format.format(new Date(context.window().getEnd()));
        Long count = elements.iterator().next();

        ChannelMarketingResult result = ChannelMarketingResult.builder()
                .channel(channel)
                .windowEnd(windowEnd)
                .cnt(count)
                .build();
        out.collect(result);
    }
}
